RxJava和Retorfit搭配應可稱是最經典的用法,RxJava善於thread切換,Retrofit呼叫API時經常是背景thread執行並於UI thread更新畫面,兩者供需完全相符。
除了基本的API呼叫以外,今天會用幾個RxJava的Operator操作API的巢狀和合併,就會明顯感受到RxJava的威力。
Retrofit官方已支援RxJava2 Adapter,只要加入dependencies就能把Call變成Observable。
implementation "com.squareup.retrofit2:adapter-rxjava2:2.3.0"
// 如果昨天沒新增RxJava2的話也要加入這些
implementation "io.reactivex.rxjava2:rxjava:2.1.7"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
在Retrofit builder中增加adapter。
Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(new LiveDataCallAdapterFactory())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // <- This
.build()
.create(GithubService.class);
interface將回傳型態改成Single,選用Single是因為API只會執行一次並且會有回傳值,為與原程式區隔我們method名稱後加上RX。
public interface GithubService {
@GET("search/repositories")
Single<RepoSearchResponse> searchReposRX(@Query("q") String query);
...
}
這樣寫的話API會回傳RepoSearchResponse,但通常我們會需要知道response code,所以加上Retrofit的Response把它包起來,就能同時得到資料和code。
public interface GithubService {
@GET("search/repositories")
Single<Response<RepoSearchResponse>> searchReposRX(@Query("q") String query);
...
}
接著就跟一般連線一樣,在Repository建立方法讓ViewModel呼叫。
public class RepoRepository {
...
public Single<Response<RepoSearchResponse>> searchRepoRX(String query) {
return githubService.searchReposRX(query);
}
}
ViewModel也建立方法讓View使用。
public class RepoViewModel extends ViewModel {
....
Single<Response<RepoSearchResponse>> searchRepoRX(String query) {
return repository.searchRepoRX(query);
}
}
View中的使用方式:
viewModel.searchRepoRX(query)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableSingleObserver<Response<RepoSearchResponse>>() {
@Override
public void onSuccess(Response<RepoSearchResponse> response) {
int code = response.code();
RepoSearchResponse body = response.body();
// Do something with code and body.
}
@Override
public void onError(Throwable e) {
// Error handle.
}
});
Retrofit會幫我們建立好Observable所以不需再使用fromCallable
等方式,可以直接呼叫searchRepoRX
就進行subscribe,這樣就完成最基本的用法了。
也可以加入Disposable中讓使用者離開畫面時就取消作業。
private CompositeDisposable disposables = new CompositeDisposable();
...
disposables.add(viewModel.searchRepoRX(query)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<Response<RepoSearchResponse>>() {
@Override
public void onSuccess(Response<RepoSearchResponse> response) {
int code = response.code();
RepoSearchResponse repoSearchResponse = response.body();
}
@Override
public void onError(Throwable e) {
}
}));
...
@Override
public void onDestroyView() {
super.onDestroyView();
disposables.clear();
}
記得要改用subscribeWith
才能加至disposables中,小訣竅可以先把Observable整個寫好,最後才用add()
加入,如果一開始就寫add那Android Studio的自動完成功能有時會怪怪。
不過這樣也許不足以說服人使用RxJava,因為Retrofit的Call寫起來還比較簡潔,接著我們來看比較複雜的情況,就會讓RxJava和Call有明顯差異了。
如果我們要先用API取得User資料後,再發出下一個連線取得其Pet資料,只用Retrofit就會產生巢狀callback:
service.getUser(id).enqueue(new Callback<User>() {
@Override
public void onResponse(Call<User> call, Response<User> response) {
String petId = response.body().getPetId();
service.getPet(petId).enqueue(new Callback<Pet>() {
@Override
public void onResponse(Call<Pet> call, Response<Pet> response) {
// Do something with Pet.
}
@Override
public void onFailure(Call<Pet> call, Throwable t) {
}
}
@Override
public void onFailure(Call<User> call, Throwable t) {
}
});
使用RxJava
用flatMap將原本的Single getUser(id)
轉換成service.getPet(petId)
並繼續往下進行,先看用Android Studio縮起來時的樣子,比較不會亂:
service.getUser(id)
.subscribeOn(Schedulers.io())
.flatMap((Function)(response) -> {
String petId = response.body().getPetId();
return service.getPet(petId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<Response<Pet>>() {
@Override
public void onSuccess(Response<Pet> response) {
// Do something with Pet.
}
@Override
public void onError(Throwable e) {
}
});
在flatMap
中取得petId,並以此id進行下一個連線getPet(petId)
,最後subscribe時就會得到目標Pet,全程都不會產生巢狀callback。
service.getPet(petId)
不用指定在io thread因為預設會跟上級的service.getUser(id)
在同一個thread,如果有需要切換的話可以在service.getPet(petId)
使用subscribeOn
。
接著看看沒有縮起來的flatMap
原樣:
service.getUser(id)
.subscribeOn(Schedulers.io())
.flatMap(new Function<Response<User>, SingleSource<Response<Pet>>>() {
@Override
public SingleSource<Response<Pet>> apply(Response<User> response) throws Exception {
String petId = response.body().getPetId();
return service.getPet(petId);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<Response<Pet>>() {
@Override
public void onSuccess(Response<Pet> response) {
// Do something with Pet.
}
@Override
public void onError(Throwable e) {
}
});
乍看很複雜,其實只要注意參數的輸入輸出就好了,如下圖,紅色框的是輸入,Android Studio會自動產生所以不要緊,我們須完成的是綠色框,表示輸出,只要看service.getPet(petId)
的回傳型態是什麼,此例而言是Response<Pet>
,把它填到綠色框的位置,就完成flatMap
設置了。
在一個流程中flatMap
可以使用多次,就算User要再經過轉換才能拿到petId也沒關係,只要中間多一個flatMap
就好。
那以我們的程式而言,搜尋repo的API會回傳總數和一個repo列表,我們希望依照這repo列表再發出API得到每一個repo的User詳細資訊,可以這樣寫:
viewModel.searchRepoRX(query)
.subscribeOn(Schedulers.io())
.flatMap(new Function<Response<RepoSearchResponse>, ObservableSource<Repo>>() {
@Override
public ObservableSource<Repo> apply(Response<RepoSearchResponse> response) throws Exception {
List<Repo> repos = response.body().getItems();
return Observable.fromIterable(repos);
}
})
.flatMap(new Function<Repo, ObservableSource<Response<User>>>() {
@Override
public ObservableSource<Response<User>> apply(Repo repo) throws Exception {
return viewModel.getUser(repo.owner.login);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<Response<User>>() {
@Override
public void onNext(Response<User> response) {
// Get an user's information.
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
第一個flatMap
將API回應的repo列表透過fromIterable
轉成Observable,由於Observab的特性是會執行多次,如前一篇所敘適合用在迴圈類型。
第二個flatMap
就依照前一個轉好的repo列表陸續執行getUser
連線,最後subscribe時的onNext
就是API回傳的各個User資料了。
可以注意的是flatMap
不一定會依照repo的順序來取得User,即repo列表有1~5的話,flatMap
取得User的順序有可能是12534,如果要確保依照repo的順序取得User可改用concatMap
,用法跟flatMap
完全一樣。
註一:為了簡化及方便上手,上例我們直接將interface中的回傳型態由Single改成Observable,而不在流程中使用toObservable()
做轉換。
@GET("search/repositories")
Observable<Response<RepoSearchResponse>> searchReposRX(@Query("q") String query);
@GET("users/{login}")
Observable<Response<User>> getUser(@Path("login") String login);
Zip可以讓我們合併多個Observable,並在全部執行完時才呼叫callback。
例如getUser
和getPet
都須完成才要進行下一步,就可以用Zip包起來,zip會等到兩個連線都完成了才進入callback:
Observable.zip(service.getUser(userId), service.getPet(petId),
new BiFunction<Response<User>, Response<Pet>, UserAndPet>() {
@Override
public UserAndPet apply(Response<User> response, Response<Pet> response2) throws Exception {
User user = response.body();
Pet pet = response2.body();
return new UserAndPet(user, pet);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<UserAndPet>() {
@Override
public void onNext(UserAndPet userAndPet) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
BiFunction
的三個參數分別代表getUser
的回傳值、getPet
的回傳值、以及兩者合併的結果UserAndPet,其中UserAndPet只是一個POJO。
public class UserAndPet {
public User user;
public Pet pet;
public UserAndPet(User user, Pet pet) {
this.user = user;
this.pet = pet;
}
}
Zip讓我們避免連線的時間差問題,即便getUser
連線速度比getPet
快一倍我們也不用自行實作等待機制。
今天介紹了RxJava搭配Retrofit,其實Retrofit一直不是系列文章的重點,但這組搭配是非常好的RxJava練習對象,包括thread切換以及使用Operator處理回傳資料、串聯多個連線等操作都能發揮RxJava的精神,很多教學文以及我自己入門RxJava也是這樣開始的,反正建立新的連線method來練習,失敗也不虧。
另外想強調一下Operator並非只適合用於API連線,任何的Observable都可以用Operator,要讓method延遲特定時間再執行可以用Timer
,要過濾列表中符合條件的物件可以用Filter
,族繁不及備載,可參考官方文件或這個sample找想要的Operator來用。
GitHub source code:
https://github.com/IvanBean/ITBon2018/tree/day26-rxjava2-retrofit